# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import alerts
import json
import random
import string
import unittest
import webtest

from google.appengine.api import memcache
from google.appengine.ext import testbed


class AlertsTest(unittest.TestCase):

  def setUp(self):
    self.testbed = testbed.Testbed()
    self.testbed.activate()
    self.testbed.init_all_stubs()
    self.testapp = webtest.TestApp(alerts.app)

  def tearDown(self):
    self.testbed.deactivate()

  def check_json_headers(self, res):
    self.assertEqual(res.content_type, 'application/json')
    # This is necessary for cross-site tools to retrieve alerts
    self.assertEqual(res.headers['access-control-allow-origin'], '*')

  def test_get_no_data_cached(self):
    res = self.testapp.get('/alerts')
    self.check_json_headers(res)
    self.assertEqual(res.body, '{}')

  def test_happy_path(self):
    # Set it.
    params = {'content': '{"alerts": ["hello", "world"]}'}
    self.testapp.post('/alerts', params)

    def happy_path():
      # Get it.
      res = self.testapp.get('/alerts')
      self.check_json_headers(res)
      data = json.loads(res.body)

      # The server should have stuck a 'date' on there.
      self.assertTrue('date' in data)
      self.assertEqual(type(data['date']), int)

      self.assertEqual(data['alerts'], ['hello', 'world'])

    happy_path()

    memcache.Client().flush_all()

    happy_path()

  def test_post_invalid_data_not_reflected(self):
    params = {'content': '[{"this is not valid JSON'}
    self.testapp.post('/alerts', params, status=400)
    res = self.testapp.get('/alerts')
    self.assertEqual(res.body, '{}')

  def test_post_invalid_data_does_not_overwrite_valid_data(self):
    # Populate the cache with something valid
    params = {'content': '{"alerts": "everything is OK"}'}
    self.testapp.post('/alerts', params)
    self.testapp.post('/alerts', {'content': 'woozlwuzl'}, status=400)
    res = self.testapp.get('/alerts')
    self.check_json_headers(res)
    data = json.loads(res.body)
    self.assertEqual(data['alerts'], 'everything is OK')

  def test_alerts_jsons_are_stored_in_history(self):
    test_alerts1 = {'alerts': ['hello', 'world', '1']}
    test_alerts2 = {'alerts': ['hello', 'world', '2']}
    self.testapp.post('/alerts', {'content': json.dumps(test_alerts1)})
    self.testapp.post('/alerts', {'content': json.dumps(test_alerts2)})
    alerts_query = alerts.AlertsJSON.query().order(alerts.AlertsJSON.date)
    stored_alerts = alerts_query.fetch(limit=3)
    self.assertEqual(2, len(stored_alerts))
    self.assertEqual(stored_alerts[0].type, 'alerts')
    self.assertEqual(stored_alerts[1].type, 'alerts')
    stored_alerts1 = json.loads(stored_alerts[0].json)
    stored_alerts2 = json.loads(stored_alerts[1].json)
    self.assertEqual(test_alerts1['alerts'], stored_alerts1['alerts'])
    self.assertEqual(test_alerts2['alerts'], stored_alerts2['alerts'])
    self.assertTrue('date' in stored_alerts1)
    self.assertTrue('date' in stored_alerts2)
    self.assertEqual(type(stored_alerts1['date']), int)
    self.assertEqual(type(stored_alerts2['date']), int)

  def test_repeating_alerts_are_not_stored_to_history(self):
    test_alerts = {'alerts': ['hello', 'world']}
    self.testapp.post('/alerts', {'content': json.dumps(test_alerts)})
    test_alerts['last_builder_info'] = {'some': 'info'}
    self.testapp.post('/alerts', {'content': json.dumps(test_alerts)})
    stored_alerts = alerts.AlertsJSON.query().fetch(limit=2)
    self.assertEqual(1, len(stored_alerts))

  def test_alerts_jsons_are_retrieved_from_history(self):
    test_alert = {'alerts': ['hello', 'world', '1']}
    alerts.AlertsJSON(json=json.dumps(test_alert), type='alerts').put()
    response = self.testapp.get('/alerts-history')
    self.assertEqual(response.status_int, 200)
    self.assertEqual(response.content_type, 'application/json')
    parsed_json = json.loads(response.normal_body)
    self.assertEqual(len(parsed_json['history']), 1)

    entry_id = parsed_json['history'][0]
    response = self.testapp.get('/alerts-history/%s' % entry_id)
    self.assertEqual(response.status_int, 200)
    self.assertEqual(response.content_type, 'application/json')
    parsed_json = json.loads(response.normal_body)
    self.assertEqual(parsed_json['alerts'], test_alert['alerts'])

  def test_provides_login_url(self):
    response = self.testapp.get('/alerts-history')
    self.assertIn('login-url', response)

  def test_invalid_keys_return_400(self):
    response = self.testapp.get('/alerts-history/kjhg$%T',
                                expect_errors=True)
    self.assertEqual(response.status_int, 400)
    self.assertEqual(response.content_type, 'application/json')

  def test_non_existing_keys_return_404(self):
    response = self.testapp.get('/alerts-history/5348024557502464',
                                expect_errors=True)
    self.assertEqual(response.status_int, 404)
    self.assertEqual(response.content_type, 'application/json')

  def test_internal_alerts_can_only_retrieved_by_internal_users(self):
    test_alert = {'alerts': ['hello', 'world', '1']}
    internal_alert = alerts.AlertsJSON(json=json.dumps(test_alert),
                                       type='internal-alerts')
    internal_alert_key = internal_alert.put().integer_id()

    # No signed-in user.
    response = self.testapp.get('/alerts-history/%s' % internal_alert_key,
                                expect_errors=True)
    self.assertEqual(response.status_int, 404)
    self.assertEqual(response.content_type, 'application/json')
    parsed_json = json.loads(response.normal_body)
    self.assertNotIn('alerts', parsed_json)

    # Non-internal user.
    self.testbed.setup_env(USER_EMAIL='test@example.com', USER_ID='1',
                           USER_IS_ADMIN='1', overwrite=True)
    response = self.testapp.get('/alerts-history/%s' % internal_alert_key,
                                expect_errors=True)
    self.assertEqual(response.status_int, 404)
    self.assertEqual(response.content_type, 'application/json')
    parsed_json = json.loads(response.normal_body)
    self.assertNotIn('alerts', parsed_json)

  def test_lists_internal_alerts_to_internal_users_only(self):
    test_alert = {'alerts': ['hello', 'world', '1']}
    alerts.AlertsJSON(json=json.dumps(test_alert),
                      type='internal-alerts').put()

    # No signed-in user.
    response = self.testapp.get('/alerts-history')
    self.assertEqual(response.status_int, 200)
    self.assertEqual(response.content_type, 'application/json')
    response_json = json.loads(response.normal_body)
    self.assertEqual(len(response_json['history']), 0)

    # Non-internal user.
    self.testbed.setup_env(USER_EMAIL='test@example.com', USER_ID='1',
                           USER_IS_ADMIN='1', overwrite=True)
    response = self.testapp.get('/alerts-history')
    self.assertEqual(response.status_int, 200)
    self.assertEqual(response.content_type, 'application/json')
    response_json = json.loads(response.normal_body)
    self.assertEqual(len(response_json['history']), 0)

    # Internal user.
    self.testbed.setup_env(USER_EMAIL='test@google.com', USER_ID='2',
                           USER_IS_ADMIN='1', overwrite=True)
    response = self.testapp.get('/alerts-history')
    self.assertEqual(response.status_int, 200)
    self.assertEqual(response.content_type, 'application/json')
    response_json = json.loads(response.normal_body)
    self.assertEqual(len(response_json['history']), 1)

  def test_returned_alerts_from_history_are_paged(self):
    for i in range(20):
      test_alert = {'alerts': ['hello', 'world', i]}
      alerts.AlertsJSON(json=json.dumps(test_alert), type='alerts').put()

    response = self.testapp.get('/alerts-history?limit=15')
    self.assertEqual(response.status_int, 200)
    self.assertEqual(response.content_type, 'application/json')
    response_json = json.loads(response.normal_body)
    self.assertEqual(len(response_json['history']), 15)
    self.assertEqual(response_json['has_more'], True)

    url = '/alerts-history?limit=15&cursor=%s' % response_json['cursor']
    response = self.testapp.get(url)
    self.assertEqual(response.status_int, 200)
    self.assertEqual(response.content_type, 'application/json')
    response_json = json.loads(response.normal_body)
    self.assertEqual(len(response_json['history']), 5)
    self.assertEqual(response_json['has_more'], False)

  def test_large_number_of_alerts(self):
    # This generates ~2.5MB of JSON that compresses to ~750K. Real
    # data compresses about 6x better.
    random.seed(0xf00f00)
    put_alerts = self.generate_fake_alerts(4000)

    params = {'content': json.dumps(put_alerts)}
    self.testapp.post('/alerts', params)

    res = self.testapp.get('/alerts')
    got_alerts = json.loads(res.body)
    self.assertEquals(got_alerts['alerts'], put_alerts['alerts'])

  def test_alerts_too_big_for_memcache(self):
    random.seed(0xf00f00)
    big_alerts = self.generate_fake_alerts(10000)
    content = json.dumps(big_alerts)
    self.assertTrue(len(content) > alerts.AlertsHandler.MAX_JSON_SIZE)

    params = {'content': content}

    self.testapp.post('/alerts', params)
    res = self.testapp.get('/alerts')
    got_alerts = json.loads(res.body)
    self.assertEquals(got_alerts['alerts'], big_alerts['alerts'])
    self.assertEquals(memcache.get(alerts.AlertsHandler.ALERTS_TYPE), None)

  def test_update_alerts_when_last_was_stored_in_gcs(self):
    random.seed(0xf00f00)
    alerts_1 = self.generate_fake_alerts(10000)
    content = json.dumps(alerts_1)
    self.assertTrue(len(content) > alerts.AlertsHandler.MAX_JSON_SIZE)
    params = {'content': content}
    self.testapp.post('/alerts', params)

    test_alerts2 = {'alerts': ['hello', 'world', '1']}
    self.testapp.post('/alerts', {'content': json.dumps(test_alerts2)})
    alerts_query = alerts.AlertsJSON.query().order(alerts.AlertsJSON.date)
    stored_alerts = alerts_query.fetch(limit=3)
    self.assertEqual(2, len(stored_alerts))
    self.assertEqual(stored_alerts[0].type, 'alerts')
    self.assertEqual(stored_alerts[1].type, 'alerts')
    self.assertTrue(stored_alerts[0].use_gcs)
    stored_alerts2 = json.loads(stored_alerts[1].json)
    self.assertEqual(test_alerts2['alerts'], stored_alerts2['alerts'])
    self.assertTrue('date' in stored_alerts2)
    self.assertEqual(type(stored_alerts2['date']), int)


  def generate_fake_alerts(self, n):
    return {'alerts': [self.generate_fake_alert() for _ in range(n)]}

  @staticmethod
  def generate_fake_alert():
    # fake labels
    labels = [['', 'last_', 'latest_', 'failing_', 'passing_'],
              ['build', 'builder', 'revision'],
              ['', 's', '_url', '_reason', '_name']]

    def label():
      return string.join(map(random.choice, labels), '')

    # fake values
    def time():
      return random.randint(1407976107614, 1408076107614) / 101.0

    def build():
      return random.randint(2737, 2894)

    def revision():
      return random.randint(288849, 289415)

    tests = [['Activity', 'Async', 'Browser', 'Content', 'Input'],
             ['Manager', 'Card', 'Sandbox', 'Container'],
             ['Test.'],
             ['', 'Basic', 'Empty', 'More'],
             ['Mouse', 'App', 'Selection', 'Network', 'Grab'],
             ['Input', 'Click', 'Failure', 'Capture']]

    def test():
      return string.join(map(random.choice, tests), '')

    def literal_array():
      generator = random.choice([time, build, revision])
      return [generator() for _ in range(random.randint(0, 10))]

    def literal_map():
      generators = [build, revision, test, literal_array]
      obj = {}
      for _ in range(random.randint(3, 9)):
        obj[label()] = random.choice(generators)()
      return obj

    def value():
      generators = [time, build, revision, test, literal_array,
                    literal_map]
      return random.choice(generators)()

    alert = {}
    for _ in range(random.randint(6, 9)):
      alert[label()] = value()
    return alert
